feat: expansion aware batching for explode#6069
feat: expansion aware batching for explode#6069universalmind303 wants to merge 1 commit intomainfrom
Conversation
Greptile SummaryThis PR introduces expansion-aware dynamic batching for the explode operator to prevent memory pressure from high-cardinality expansions. The implementation monitors the explode's expansion ratio (output rows / input rows) and dynamically reduces upstream batch size requirements proportionally, so that when an explode has 100x expansion, it requests ~100x fewer rows from upstream to produce appropriately sized downstream batches. Key changes:
The implementation includes safety bounds ( Confidence Score: 5/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Upstream as Upstream Operator
participant BE as BatchManager
participant EO as ExplodeOperator
participant ES as ExpansionState
participant Stats as ExplodeStats
participant Downstream as Downstream Operator
Note over BE: Initial requirement from<br/>downstream (e.g., Strict(1000))
BE->>Upstream: Request initial batch<br/>(Strict(1000) rows)
Upstream->>EO: Send 1000 rows
EO->>EO: Explode operation<br/>(e.g., 100x expansion)
EO->>Stats: Update rows_in=1000,<br/>rows_out=100000
EO->>Downstream: Send 100000 rows
Note over BE,ES: Record execution stats
BE->>ES: record_execution_stat(stats)
ES->>Stats: Load cumulative counters<br/>(rows_in, rows_out)
ES->>ES: Calculate expansion ratio<br/>(100000/1000 = 100)
ES->>ES: Apply EMA smoothing<br/>smoothed_expansion = 100
Note over BE: Calculate new requirements
BE->>ES: calculate_new_requirements()
ES->>ES: reduction = 1/100 = 0.01
ES->>BE: Return Strict(10)
Note over BE: Next batch uses reduced size
BE->>Upstream: Request reduced batch<br/>(Strict(10) rows)
Upstream->>EO: Send 10 rows
EO->>EO: Explode operation<br/>(~100x expansion)
EO->>Stats: Update rows_in=1010,<br/>rows_out=101000
EO->>Downstream: Send ~1000 rows
Note over Downstream: Receives appropriately<br/>sized batches
|
colin-ho
left a comment
There was a problem hiding this comment.
This looks fine, but i wonder if we can do something simpler thats more proactive instead of reactive.
For example, if the explode operator itself knew how many rows the downstream operator expects, it can choose how many rows of the input morsel to explode in order to hit that requirement. I believe it has this information from the offsets (or if its a fixed size list, the list size).
|
@universalmind303 are you still working on this? |
|
closing for now. Will try to revisit with the proactive approach as suggested by @colin-ho. |
Changes Made
new dynamic batching strategy for explodes. Since explodes can greatly increase the selectivity and potentially blow up memory, I thought we could use our dynamic batching to inform the explode operator to back off. So now similar to this PR which increased the batch sizes for filters with low cardinality, This PR is essentially the inverse of that, but for explodes.
See comment here #5924 (comment)
Related Issues